/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.fnexecution.wire;



import com.bff.gaia.unified.model.pipeline.v1.RunnerApi;

import com.bff.gaia.unified.runners.core.construction.ModelCoders;

import com.bff.gaia.unified.runners.core.construction.RehydratedComponents;

import com.bff.gaia.unified.runners.core.construction.SyntheticComponents;

import com.bff.gaia.unified.runners.core.construction.graph.PipelineNode.PCollectionNode;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.util.WindowedValue;

import com.bff.gaia.unified.sdk.util.WindowedValue.FullWindowedValueCoder;



import java.io.IOException;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;



/** Helpers to construct coders for gRPC port reads and writes. */

public class WireCoders {

  /**

   * Creates an SDK-side wire coder for a port read/write for the given PCollection. Coders that are

   * unknown to the runner are wrapped with length-prefix coders. The inner element coders are kept

   * intact so that SDK harnesses can reconstruct the original elements.

   *

   * <p>Adds all necessary coders to the components builder.

   *

   * @return id of a windowed value coder containing the PCollection's element coder

   */

  public static String addSdkWireCoder(

      PCollectionNode pCollectionNode, RunnerApi.Components.Builder components) {

    return addWireCoder(pCollectionNode, components, false);

  }



  /**

   * Creates a runner-side wire coder for a port read/write for the given PCollection. Unknown

   * coders are replaced with length-prefixed byte arrays.

   *

   * <p>Adds all necessary coders to the components builder.

   *

   * @return id of a windowed value coder containing the PCollection's element coder

   */

  public static String addRunnerWireCoder(

      PCollectionNode pCollectionNode, RunnerApi.Components.Builder components) {

    return addWireCoder(pCollectionNode, components, true);

  }



  /**

   * Instantiates a runner-side wire coder for the given PCollection. Any component coders that are

   * unknown by the runner are replaced with length-prefixed byte arrays.

   *

   * @return a windowed value coder containing the PCollection's element coder

   */

  public static <T> Coder<WindowedValue<T>> instantiateRunnerWireCoder(

      PCollectionNode pCollectionNode, RunnerApi.Components components) throws IOException {

    // NOTE: We discard the new set of components so we don't bother to ensure it's consistent with

    // the caller's view.

    RunnerApi.Components.Builder builder = components.toBuilder();

    String protoCoderId = addRunnerWireCoder(pCollectionNode, builder);

    Coder<?> javaCoder = RehydratedComponents.forComponents(builder.build()).getCoder(protoCoderId);

    checkArgument(

        javaCoder instanceof WindowedValue.FullWindowedValueCoder,

        "Unexpected Deserialized %s type, expected %s, got %s",

        RunnerApi.Coder.class.getSimpleName(),

        FullWindowedValueCoder.class.getSimpleName(),

        javaCoder.getClass());

    return (Coder<WindowedValue<T>>) javaCoder;

  }



  private static String addWireCoder(

      PCollectionNode pCollectionNode,

      RunnerApi.Components.Builder components,

      boolean useByteArrayCoder) {

    String elementCoderId = pCollectionNode.getPCollection().getCoderId();

    String windowingStrategyId = pCollectionNode.getPCollection().getWindowingStrategyId();

    String windowCoderId =

        components.getWindowingStrategiesOrThrow(windowingStrategyId).getWindowCoderId();

    RunnerApi.Coder windowedValueCoder =

        ModelCoders.windowedValueCoder(elementCoderId, windowCoderId);

    // Add the original WindowedValue<T, W> coder to the components;

    String windowedValueId =

        SyntheticComponents.uniqueId(

            String.format("fn/wire/%s", pCollectionNode.getId()), components::containsCoders);

    components.putCoders(windowedValueId, windowedValueCoder);

    return LengthPrefixUnknownCoders.addLengthPrefixedCoder(

        windowedValueId, components, useByteArrayCoder);

  }



  // Not instantiable.

  private WireCoders() {}

}